package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Flink SQL - Group Windows
 * 滑动窗口 - HOP(time_attr, interval, interval)
 */
public class Flink06_FlinkSQL_GroupWindow02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int," +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.sqlQuery("SELECT " +
                " id, " +
                "  hop_start(t, INTERVAL '10' minute, INTERVAL '1' hour) as WatermarkStart,  " +
                "  hop_end(t, INTERVAL '10' minute, INTERVAL '1' hour) as WatermarkEnd,  " +
                " SUM(vc) vc_sum_value " +
                " from sensor " +
                "GROUP BY hop(t, INTERVAL '10' minute, INTERVAL '1' hour), id")
                .execute()
                .print();

    }
}
/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
#####################################################
+----+--------------+-------------------------+-------------------------+--------------+
| op |           id |          WatermarkStart |            WatermarkEnd | vc_sum_value |
+----+--------------+-------------------------+-------------------------+--------------+
| +I |     sensor_1 | 1970-01-01 07:10:00.000 | 1970-01-01 08:10:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:10:00.000 | 1970-01-01 08:10:00.000 |          140 |
| +I |     sensor_2 | 1970-01-01 07:20:00.000 | 1970-01-01 08:20:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:20:00.000 | 1970-01-01 08:20:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:30:00.000 | 1970-01-01 08:30:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:30:00.000 | 1970-01-01 08:30:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:40:00.000 | 1970-01-01 08:40:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:40:00.000 | 1970-01-01 08:40:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 07:50:00.000 | 1970-01-01 08:50:00.000 |          140 |
| +I |     sensor_1 | 1970-01-01 07:50:00.000 | 1970-01-01 08:50:00.000 |          430 |
| +I |     sensor_1 | 1970-01-01 08:00:00.000 | 1970-01-01 09:00:00.000 |          430 |
| +I |     sensor_2 | 1970-01-01 08:00:00.000 | 1970-01-01 09:00:00.000 |          140 |
+----+--------------+-------------------------+-------------------------+--------------+
 */